Skip to content

Feature/composite workflow execution v1#1

Open
stevanbz wants to merge 18 commits intofeature/composite-workflow-transport-crud-executionfrom
feature/composite-workflow-execution-v1
Open

Feature/composite workflow execution v1#1
stevanbz wants to merge 18 commits intofeature/composite-workflow-transport-crud-executionfrom
feature/composite-workflow-execution-v1

Conversation

@stevanbz
Copy link
Copy Markdown
Owner

Issue #, if available:

Description of changes:

  • Created WorkflowRunner logic that iterates through the list of the monitors and sequentially executes monitor depending of the type
  • Created logic for getting the chained monitor finding doc ids (two steps) -> 1. getting the finding docIds per execution (using the workflowRunContext) 2. Based on the docIds used in the first step relevant documents are determined for the currently processed monitor (Relevant classes: WorkflowService, CompositeWorkflowRunner, WorkflowRunContext - which is instantiated every time run happens)
  • Created TransportExecuteWorkflowAction and RestExecuteWorkflowAction
  • Added integration tests that are testing the workflow execution. WorkflowRunnerIT.test execute workflow with custom alerts and finding index with doc level and bucket level delegates can be ignored because in AlertingSingleNode test cases it's very hard to create bucket level monitors since this test suite is mocking ScriptService which is responsible for loading Scripts

CheckList:
[ ] Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

…enario

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
@eirsep
Copy link
Copy Markdown

eirsep commented Feb 20, 2023

can you look into adding this painless script module at plugin load test
there will be some method you can override and register the necessary plugin

Let's look into how we can verify composite monitors containing bucket level monitors

…nitor index is not initialized yet. Added workflow crud test cases

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
Copy link
Copy Markdown

@eirsep eirsep left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes, Stevan
have reviewed 50% of the PR
will review more while you can address the comments

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
import org.opensearch.alerting.action.ExecuteMonitorAction
import org.opensearch.alerting.action.ExecuteWorkflowAction
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's discuss offline about cluster/node level settings for composite workflows

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok sounds good.

periodEnd: Instant,
dryrun: Boolean
dryrun: Boolean,
workflowExecutionContext: WorkflowRunContext?
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: name the variable same as the type

else QueryBuilders.boolQuery().must(source.query())
queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues))

if (workflowRunContext != null && !workflowRunContext.indexToDocIds.isNullOrEmpty()) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we applying this logic here and not in InputService where the actual search query is being executed ?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you are right. This logic can be removed from here - I forgot to remove it once I added in input service. Tnx and good catch!

}

// If monitor execution is triggered from a workflow
val indexToRelatedDocIdsMap = workflowRunContext?.indexToDocIds
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we intialize this just before its usage instead of here?

}
}

private fun updateInputQueryWithFindingDocIds(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments/javadocs to explain what we intend to do wherever we are using chained findings filtering


// Rewrite query to consider the doc ids per given index
if (chainedFindingExist(indexToDocIds)) {
val updatedSourceQuery = updateInputQueryWithFindingDocIds(input.query.query(), indexToDocIds!!)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null check for query required?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we changing at input query
add a filter after search query is constructed

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null check for query required?

You are right. Adding the null check.

Copy link
Copy Markdown
Owner Author

@stevanbz stevanbz Mar 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we changing at input query add a filter after search query is constructed

Since rewrittenQuery.query() returns QueryBuilder()! (which can be null) we must do a cast to a BoolQueryBuilder (I guess) which then later we would need to set again to a rewrittenQuery.query.
You can see here that later on query is transformed in a String so it wouldn't be so straight forward to add a filter.
I don't have any more idea how to do this in elegant way (maybe lacking domain knowledge around the OpenSearch classes I can use for this purpose)- if you can give me a hint or a code snippet how I can do, it would be good. Tnx!

}
}

private fun updateInputQueryWithFindingDocIds(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a common methods used in all the monitor types

Copy link
Copy Markdown
Owner Author

@stevanbz stevanbz Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the same - but then I saw that the search query is executed on a different way depending of the monitor type.
Ie. here you can see how the doc level monitor is getting the matching docs. So, for example, doc level monitor iterates through the list of indices and getting the documents index by index. That's why I adjusted getting the matched docIds on doc level monitor to be aligned with existing logic. Check it out here

val xContentRegistry: NamedXContentRegistry,
) {

suspend fun getFindingDocIdsPerMonitorExecution(chainedMonitor: Monitor, workflowExecutionId: String): Map<String, List<String>> {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getFindingDocIdsByExecutionId*

.seqNoAndPrimaryTerm(true)
)
.indices(chainedMonitor.dataSources.findingsIndex)
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle indexNotFound

return buildMonitors(searchResponse)
}

private fun buildMonitors(response: SearchResponse): List<Monitor> {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this function be called parseMonitors

return monitors
}

suspend fun getDocIdsPerFindingIndex(monitorId: String, workflowExecutionId: String): Map<String, List<String>> {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadocs

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function will be removed since it's not used at all.

…e workflow

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
… consider the workflow execution id

Added worfklow service used for retrieving monitors and their findings. Added business logic for considering the chained monitors

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
…when loading the cluster

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>

class ExecuteWorkflowRequest : ActionRequest {
val dryrun: Boolean
val requestEnd: TimeValue
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is requestEnd?

Copy link
Copy Markdown
Owner Author

@stevanbz stevanbz Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy-paste of ExecuteMonitorRequest. Used in CompositeWorkflowRunner - and passed to concrete monitor runner - ie in bucketLevelMonitors used for defining the search params when creating findings. Check it out here

import org.opensearch.commons.alerting.model.Workflow
import java.io.IOException

class ExecuteWorkflowRequest : ActionRequest {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadocs for field

)

override fun validate(): ActionRequestValidationException? {
return null
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validations?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added check. Tnx and good point


class ExecuteWorkflowResponse : ActionResponse, ToXContentObject {

val workflowRunResult: List<MonitorRunResult<*>>
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should store other fields like workflow execution start, and end time, status=failed, successful

Copy link
Copy Markdown
Owner Author

@stevanbz stevanbz Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Will add those fields and appropriate logic around them

return listOf()
}

override fun replacedRoutes(): MutableList<RestHandler.ReplacedRoute> {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need replacedRoutes. we are not replacing routes. this would be a new API

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing class completely. Sorry

TODO("Not yet implemented")
}
): List<MonitorRunResult<*>> {
val workflowExecutionId = UUID.randomUUID().toString()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make this execution id more deterministic..
workflowId+timestamp

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like:
val workflowExecutionId = UUID.randomUUID().toString() + LocalDateTime.now()
What do you think?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to something like:
val executionId = workflow.id.plus(LocalDateTime.now()).plus(UUID.randomUUID().toString())

): MonitorRunResult<*> {
TODO("Not yet implemented")
}
): List<MonitorRunResult<*>> {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should return workflowRunResult which should contain list of monitorRunResult

return indexToRelatedDocIdsMap
}

suspend fun searchMonitors(monitors: List<String>, size: Int, owner: String?): List<Monitor> {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to getMonitorsById

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is owner field used?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No will remove it. Good catch


val delegates = (workflow.inputs[0] as CompositeInput).sequence.delegates.sortedBy { it.order }
// Fetch monitors by ids
val monitors = monitorCtx.workflowService!!.searchMonitors(delegates.map { it.monitorId }, delegates.size, workflow.owner)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need owner field?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't. Removing

// Validate the monitors size
if (delegates.size != monitors.size) {
val diffMonitorIds = delegates.map { it.monitorId }.minus(monitors.map { it.id }.toSet()).joinToString()
throw IllegalStateException("Delegate monitors don't exist $diffMonitorIds")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plz also log workflow id in the message

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. Also will add a logs on the beginning and end of workflow execution

…esponse class

Code adjusted according to comments

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
@stevanbz stevanbz force-pushed the feature/composite-workflow-execution-v1 branch from d94c257 to a1e0408 Compare February 27, 2023 22:48
Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
var indexToDocIds = mapOf<String, List<String>>()
var delegateMonitor: Monitor
delegateMonitor = monitorsById[delegate.monitorId]
?: throw IllegalStateException("Delegate monitor not found ${delegate.monitorId} for the workflow $workflow.id")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrap with alerting exception

* @param chainedMonitor Monitor that is previously executed
* @param workflowExecutionId Execution id of the current workflow
*/
suspend fun getFindingDocIdsByExecutionId(chainedMonitor: Monitor, workflowExecutionId: String): Map<String, List<String>> {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle indexNotFound and return empty

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking and let me elaborate a little bit my thinking and proposed solution:
Let's catch all the exceptions that can be raised, and wrap them up in AlertingException (check it out here). The caller function - the function in CompositeWorkflowRunner (here) will do a check and return empty workflow run result. What do you think?

?: throw IllegalStateException("Delegate monitor not found ${delegate.monitorId} for the workflow $workflow.id")
if (delegate.chainedFindings != null) {
val chainedMonitor = monitorsById[delegate.chainedFindings!!.monitorId]
?: throw IllegalStateException("Chained finding monitor not found ${delegate.monitorId} for the workflow $workflow.id")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrap with alerting exception

dryRun,
workflowRunContext
)
} else {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: use else if for query level and throw unsupported exception

Copy link
Copy Markdown
Owner Author

@stevanbz stevanbz Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I also wrap into alerting exception or? Ie. something like this:
Something like this:
else if(delegateMonitor.isQueryLevelMonitor()){ QueryLevelMonitorRunner.runMonitor( delegateMonitor, monitorCtx, periodStart, periodEnd, dryRun, workflowRunContext ) } else { throw AlertingException.wrap( IllegalStateException("Unsupported monitor type") ) }

data class WorkflowRunContext(
val chainedMonitorId: String?,
val workflowExecutionId: String,
val indexToDocIds: Map<String, List<String>>
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indexToDocIds is not a good variable name. Someone reading the code would not understand that this is the input source.

Copy link
Copy Markdown
Owner Author

@stevanbz stevanbz Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about "matchingDocIdsPerIndex"?

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
@eirsep
Copy link
Copy Markdown

eirsep commented Mar 8, 2023

Let's have latestRunTime and latestExecutionId in workflow object or workflow metadata object.

…dation if the query monitor is part of the workflow chain

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
@JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows"
@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"
@JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors"
@JvmField val LEGACY_OPENDISTRO_WORKFLOW_BASE_URI = "/_opendistro/_alerting/workflows"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only for legacy APIs. This is a new API, so we should not have this

stevanbz added 2 commits March 9, 2023 18:47
Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
… checking workflow metadata. Changed flow of workflow execution

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
@stevanbz stevanbz force-pushed the feature/composite-workflow-execution-v1 branch from 20de168 to 8e0d28d Compare March 9, 2023 21:17
@stevanbz stevanbz changed the base branch from feature/composite-workflow-v1 to feature/composite-workflow-transport-crud-execution March 9, 2023 22:41
@stevanbz
Copy link
Copy Markdown
Owner Author

can you look into adding this painless script module at plugin load test there will be some method you can override and register the necessary plugin

Let's look into how we can verify composite monitors containing bucket level monitors

@stevanbz stevanbz closed this Mar 13, 2023
@stevanbz stevanbz reopened this Mar 13, 2023
…hat verify that workflow metadata is not created

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
…he monitors once the workflow is updated

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants